54ab622b5fbfed70e27caef4c00c854757b1b9ab,src/main/java/com/github/ddth/kafka/internal/KafkaConsumerWorker.java,KafkaConsumerWorker,run,#,36

Before Change


                    }
                }
            }
            Thread.yield();
        }
    }
}

After Change


                    mm = it.next();
                }
            }
            if (msgListeners.size() > 0 && mm != null) {
                final String topic = mm.topic();
                final int partition = mm.partition();
                final long offset = mm.offset();
                final byte[] key = mm.key();
                final byte[] message = mm.message();
                final CountDownLatch countDownLatch = new CountDownLatch(msgListeners.size());
                for (final IKafkaMessageListener listerner : messageListerners) {
                    Thread t = new Thread("Kafka-Consumer-Delivery") {
                        public void run() {
                            try {
                                listerner.onMessage(topic, partition, offset, key, message);
                            } catch (Exception e) {
                                LOGGER.warn(e.getMessage(), e);
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    };
                    t.start();
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    LOGGER.warn(e.getMessage(), e);
                }
            } else {
                Thread.yield();
            }
        }
    }